// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! SST bench.

use std::{cmp, sync::Arc, time::Instant};

use analytic_engine::{
    sst::{
        factory::{Factory, FactoryImpl, ObjectStorePickerRef, ScanOptions, SstReadHint},
        meta_data::cache::{MetaCache, MetaCacheRef},
        metrics::MaybeTableLevelMetrics as SstMaybeTableLevelMetrics,
    },
    ScanType, SstReadOptionsBuilder,
};
use common_types::{
    projected_schema::{ProjectedSchema, RowProjectorBuilder},
    schema::Schema,
};
use logger::info;
use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use runtime::Runtime;

use crate::{config::SstBenchConfig, util};

pub struct SstBench {
    store: ObjectStoreRef,
    pub sst_file_name: String,
    max_projections: usize,
    schema: Schema,
    projected_schema: Option<ProjectedSchema>,
    sst_read_options_builder: SstReadOptionsBuilder,
    runtime: Arc<Runtime>,
}

impl SstBench {
    pub fn new(config: SstBenchConfig) -> Self {
        let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));

        let store = Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
        let sst_path = Path::from(config.sst_file_name.clone());
        let meta_cache: Option<MetaCacheRef> = config
            .sst_meta_cache_cap
            .map(|cap| Arc::new(MetaCache::new(cap)));
        let schema = runtime.block_on(util::schema_from_sst(&store, &sst_path, &meta_cache));
        let predicate = config.predicate.into_predicate();
        let projected_schema = ProjectedSchema::no_projection(schema.clone());
        let scan_options = ScanOptions {
            background_read_parallelism: 1,
            max_record_batches_in_flight: 1024,
            num_streams_to_prefetch: 0,
        };
        let maybe_table_level_metrics = Arc::new(SstMaybeTableLevelMetrics::new("bench"));
        let sst_read_options_builder = SstReadOptionsBuilder::new(
            ScanType::Query,
            scan_options,
            maybe_table_level_metrics,
            config.num_rows_per_row_group,
            predicate,
            meta_cache,
            runtime.clone(),
        );
        let max_projections = cmp::min(config.max_projections, schema.num_columns());

        SstBench {
            store,
            sst_file_name: config.sst_file_name,
            max_projections,
            schema,
            projected_schema: Some(projected_schema),
            sst_read_options_builder: sst_read_options_builder.clone(),
            runtime,
        }
    }

    pub fn num_benches(&self) -> usize {
        // One test reads all columns and `max_projections` tests read with projection.
        1 + self.max_projections
    }

    pub fn init_for_bench(&mut self, i: usize) {
        let projected_schema =
            util::projected_schema_by_number(&self.schema, i, self.max_projections);

        self.projected_schema = Some(projected_schema);
    }

    pub fn run_bench(&self) {
        let sst_path = Path::from(self.sst_file_name.clone());

        let sst_factory = FactoryImpl;
        let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone());

        let fetched_schema = self.projected_schema.as_ref().unwrap().to_record_schema();
        let table_schema = self
            .projected_schema
            .as_ref()
            .unwrap()
            .table_schema()
            .clone();
        let row_projector_builder = RowProjectorBuilder::new(fetched_schema, table_schema, None);
        let sst_read_options = self
            .sst_read_options_builder
            .clone()
            .build(row_projector_builder);
        self.runtime.block_on(async {
            let mut sst_reader = sst_factory
                .create_reader(
                    &sst_path,
                    &sst_read_options,
                    SstReadHint::default(),
                    &store_picker,
                    None,
                )
                .await
                .unwrap();
            let begin_instant = Instant::now();
            let mut sst_stream = sst_reader.read().await.unwrap();

            let mut total_rows = 0;
            let mut batch_num = 0;
            while let Some(batch) = sst_stream.fetch_next().await {
                let num_rows = batch.unwrap().num_rows();
                total_rows += num_rows;
                batch_num += 1;
            }

            info!(
                "\nSstBench total rows of sst: {}, total batch num: {}, cost: {:?}",
                total_rows,
                batch_num,
                begin_instant.elapsed(),
            );
        });
    }
}
